private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }} = create{{ name.upper_camel_case }}();

private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> create{{ name.upper_camel_case }}() {
  {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request
      = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder().build();

  Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> flowable
    = Flowable.create(emitter -> {
      scheduler.scheduleDirect(() -> {
        stub.subscribe{{ name.upper_camel_case }}(request,
            new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

          @Override
          public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
          {%- if has_result %}
          {{ plugin_name.upper_camel_case }}Result result = {{ plugin_name.upper_camel_case }}Result.translateFromRpc(value.get{{ plugin_name.upper_camel_case }}Result());

          switch (result.result) {
            case SUCCESS:
            case NEXT:
              {%- if return_type.is_repeated %}
                  {%- if return_type.is_primitive %}
              emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
                  {%- else %}
              emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
                  {%- endif %}
              {%- else %}
              emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
              {%- endif %}
              break;
            default:
              emitter.onError(new {{ plugin_name.upper_camel_case }}Exception(result.result, result.resultStr));
              break;
          }
          {%- else %}
            {%- if return_type.is_repeated %}
              {%- if return_type.is_primitive %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
              {%- else %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
              {%- endif %}
            {%- else %}
              {%- if return_type.is_primitive %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}());
              {%- else %}
          emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
              {%- endif %}
            {%- endif %}
          {%- endif %}
          }

          @Override
          public void onError(Throwable t) {
            emitter.onError(t);
          }

          @Override
          public void onCompleted() {
            emitter.onComplete();
          }
        });
      });
    }, BackpressureStrategy.BUFFER);

  return flowable
  .replay(1)
  .autoConnect();
}

public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> get{{ name.upper_camel_case }}() {
  return {{ name.lower_camel_case }};
}
